TimeSeriesSVRTslearn (tslearn-style) — Support Vector Regression for time series#
This notebook implements a tslearn-inspired time-series SVR wrapper:
TimeSeriesSVRTslearn(C=..., kernel=..., degree=..., ...)supports standard SVR kernels on flattened windows (
linear/poly/rbf/sigmoid)adds a time-series-specific option: DTW-RBF kernel (
kernel="dtw_rbf")
The goal is to have a runnable reference even without tslearn installed.
SVR in one page (kernel view)#
Given training pairs \((x_i, y_i)\), SVR learns a function \(f(x)\) that is flat (small norm) while keeping most errors within an \(\varepsilon\)-tube.
In the kernelized form, predictions are: $\(\hat{y}(x) = \sum_{i=1}^{n} (\alpha_i - \alpha_i^*)\,K(x, x_i) + b\)\( where \)K(x,x’) = \langle \Phi(x), \Phi(x’)\rangle$ is a kernel.
Why special kernels for time series?#
Flattening a window treats misalignments as large errors. For sequences that may be locally shifted/stretched, an alignment distance (DTW) can be more faithful.
We implement a simple DTW-RBF kernel: $\(K_{\text{DTW-RBF}}(x,x') = \exp(-\gamma\,\mathrm{DTW}(x,x')^2).\)$
Note: an RBF of DTW is not guaranteed PSD for all settings; empirically it often works as a similarity for SVR.
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio
from sklearn.svm import SVR
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from scipy import stats
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"
rng = np.random.default_rng(7)
import numpy, pandas, sklearn, scipy, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("scipy:", scipy.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
scipy: 1.15.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
"""Accept (n, m) or (n, d, m). Return (n, d, m)."""
X = np.asarray(X, dtype=float)
if X.ndim == 2:
return X[:, None, :]
if X.ndim == 3:
return X
raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")
def _flatten_panel(X3: np.ndarray) -> np.ndarray:
n, d, m = X3.shape
return X3.reshape(n, d * m)
def dtw_cost(a: np.ndarray, b: np.ndarray, *, window: int | None = None) -> float:
"""DTW squared cost between sequences a and b.
a, b: shape (m, d) (time-major).
window: Sakoe-Chiba radius in time steps (None -> unconstrained).
"""
a = np.asarray(a, dtype=float)
b = np.asarray(b, dtype=float)
if a.ndim == 1:
a = a.reshape(-1, 1)
if b.ndim == 1:
b = b.reshape(-1, 1)
n, da = a.shape
m, db = b.shape
if da != db:
raise ValueError("a and b must have the same dimensionality")
if window is None:
window = max(n, m)
window = int(max(window, abs(n - m)))
prev = np.full(m + 1, np.inf)
curr = np.full(m + 1, np.inf)
prev[0] = 0.0
for i in range(1, n + 1):
curr.fill(np.inf)
j_start = max(1, i - window)
j_end = min(m, i + window)
ai = a[i - 1]
for j in range(j_start, j_end + 1):
diff = ai - b[j - 1]
cost = float(np.dot(diff, diff))
curr[j] = cost + min(prev[j], curr[j - 1], prev[j - 1])
prev, curr = curr, prev
return float(prev[m])
def pairwise_dtw_cost(
X: np.ndarray,
Y: np.ndarray | None = None,
*,
window: int | None = None,
) -> np.ndarray:
"""Pairwise DTW squared costs.
X: (n, m) or (n, d, m)
Y: (k, m) or (k, d, m)
Returns: (n, k)
"""
X3 = _as_3d_panel(X)
Y3 = X3 if Y is None else _as_3d_panel(Y)
n, d, m = X3.shape
k = Y3.shape[0]
out = np.empty((n, k), dtype=float)
for i in range(n):
a = X3[i].T
for j in range(k):
b = Y3[j].T
out[i, j] = dtw_cost(a, b, window=window)
return out
def dtw_rbf_kernel(
X: np.ndarray,
Y: np.ndarray | None = None,
*,
gamma: float,
window: int | None = None,
) -> np.ndarray:
D = pairwise_dtw_cost(X, Y, window=window)
return np.exp(-float(gamma) * D)
def _acf(x: np.ndarray, max_lag: int) -> tuple[np.ndarray, np.ndarray]:
x = np.asarray(x, dtype=float)
x = x - x.mean()
denom = float(np.dot(x, x))
lags = np.arange(max_lag + 1)
values = np.zeros(max_lag + 1)
values[0] = 1.0
if denom == 0.0:
return lags, values
for k in range(1, max_lag + 1):
values[k] = float(np.dot(x[k:], x[:-k]) / denom)
return lags, values
class TimeSeriesSVRTslearn:
"""tslearn-style time-series SVR wrapper.
- For kernel in {linear, poly, rbf, sigmoid}: uses sklearn SVR on flattened windows.
- For kernel='dtw_rbf': uses sklearn SVR with kernel='precomputed' on a DTW-RBF similarity.
"""
def __init__(
self,
*,
C: float = 1.0,
kernel: str = "rbf",
degree: int = 3,
gamma: float | str | None = "scale",
coef0: float = 0.0,
epsilon: float = 0.1,
tol: float = 1e-3,
max_iter: int = -1,
dtw_window: int | None = None,
dtw_gamma: float | None = None,
):
self.C = float(C)
self.kernel = str(kernel)
self.degree = int(degree)
self.gamma = gamma
self.coef0 = float(coef0)
self.epsilon = float(epsilon)
self.tol = float(tol)
self.max_iter = int(max_iter)
self.dtw_window = dtw_window
self.dtw_gamma = dtw_gamma
self.model_: SVR | None = None
self.X_train_: np.ndarray | None = None
self.n_timepoints_: int | None = None
self.n_dims_: int | None = None
self.dtw_gamma_: float | None = None
def fit(self, X, y):
X3 = _as_3d_panel(X)
y = np.asarray(y, dtype=float)
if X3.shape[0] != y.shape[0]:
raise ValueError("X and y must have the same number of samples")
n, d, m = X3.shape
self.n_timepoints_ = int(m)
self.n_dims_ = int(d)
if self.kernel == "dtw_rbf":
self.X_train_ = X3
if self.dtw_gamma is None:
# Median heuristic on a small subset for speed
idx = np.arange(n)
if n > 60:
idx = np.random.default_rng(0).choice(n, size=60, replace=False)
D = pairwise_dtw_cost(X3[idx], window=self.dtw_window)
v = D[np.triu_indices_from(D, k=1)]
v = v[v > 0]
med = float(np.median(v)) if v.size else 1.0
self.dtw_gamma_ = 1.0 / med
else:
self.dtw_gamma_ = float(self.dtw_gamma)
K = dtw_rbf_kernel(X3, gamma=self.dtw_gamma_, window=self.dtw_window)
self.model_ = SVR(
kernel="precomputed",
C=self.C,
epsilon=self.epsilon,
tol=self.tol,
max_iter=self.max_iter,
)
self.model_.fit(K, y)
return self
X_flat = _flatten_panel(X3)
self.model_ = SVR(
kernel=self.kernel,
C=self.C,
degree=self.degree,
gamma=self.gamma,
coef0=self.coef0,
epsilon=self.epsilon,
tol=self.tol,
max_iter=self.max_iter,
)
self.model_.fit(X_flat, y)
return self
def predict(self, X) -> np.ndarray:
if self.model_ is None:
raise RuntimeError("Call fit() before predict().")
X3 = _as_3d_panel(X)
n, d, m = X3.shape
if m != self.n_timepoints_ or d != self.n_dims_:
raise ValueError(
f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_})), got {X3.shape}"
)
if self.kernel == "dtw_rbf":
if self.X_train_ is None or self.dtw_gamma_ is None:
raise RuntimeError("Model is missing DTW state; refit.")
K = dtw_rbf_kernel(X3, self.X_train_, gamma=self.dtw_gamma_, window=self.dtw_window)
return self.model_.predict(K)
X_flat = _flatten_panel(X3)
return self.model_.predict(X_flat)
Demo: one-step forecasting via sliding windows#
We build a regression dataset from a single multi-seasonal series:
input \(X_t = (y_{t-L},\dots,y_{t-1})\)
target \(y_t\)
Then we compare:
SVR with a DTW-RBF kernel (
kernel="dtw_rbf") andSVR with a standard RBF kernel on flattened windows.
def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
eps = rng.normal(0.0, sigma, size=n)
u = np.zeros(n)
for t in range(1, n):
u[t] = phi * u[t - 1] + eps[t]
return u
def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
y = np.asarray(y, dtype=float)
L = int(window_length)
if y.size <= L:
raise ValueError("y is too short")
X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
y_next = y[L:]
return X, y_next
n = 420
idx = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)
weekly = 1.6 * np.sin(2 * np.pi * t / 7) + 0.4 * np.cos(2 * np.pi * t / 7)
monthly = 1.0 * np.sin(2 * np.pi * t / 30) - 0.3 * np.cos(2 * np.pi * t / 30)
trend = 0.03 * t
noise = simulate_ar1_noise(n, phi=0.6, sigma=0.6, rng=rng)
y = 20.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")
fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic multi-seasonal series", xaxis_title="date", yaxis_title="value")
fig.show()
L = 35
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)
h = 80
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]
# Keep DTW demo small enough to run quickly
n_train_max = 140
if X_train.shape[0] > n_train_max:
X_train_small = X_train[-n_train_max:]
y_train_small = y_train[-n_train_max:]
else:
X_train_small = X_train
y_train_small = y_train
svr_dtw = TimeSeriesSVRTslearn(
C=10.0,
kernel="dtw_rbf",
epsilon=0.2,
dtw_window=10,
dtw_gamma=None,
).fit(X_train_small, y_train_small)
pred_dtw = svr_dtw.predict(X_test)
svr_rbf = TimeSeriesSVRTslearn(
C=10.0,
kernel="rbf",
gamma="scale",
epsilon=0.2,
).fit(X_train, y_train)
pred_rbf = svr_rbf.predict(X_test)
def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
mae = mean_absolute_error(y_true, y_pred)
rmse = mean_squared_error(y_true, y_pred, squared=False)
r2 = r2_score(y_true, y_pred)
print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
summarize("SVR(DTW-RBF)", y_test, pred_dtw)
summarize("SVR(RBF on lags)", y_test, pred_rbf)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[5], line 44
40 r2 = r2_score(y_true, y_pred)
41 print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
---> 44 summarize("SVR(DTW-RBF)", y_test, pred_dtw)
45 summarize("SVR(RBF on lags)", y_test, pred_rbf)
Cell In[5], line 39, in summarize(name, y_true, y_pred)
37 def summarize(name: str, y_true: np.ndarray, y_pred: np.ndarray) -> None:
38 mae = mean_absolute_error(y_true, y_pred)
---> 39 rmse = mean_squared_error(y_true, y_pred, squared=False)
40 r2 = r2_score(y_true, y_pred)
41 print(f"{name}: MAE={mae:.3f} RMSE={rmse:.3f} R^2={r2:.3f}")
File ~/miniconda3/lib/python3.12/site-packages/sklearn/utils/_param_validation.py:194, in validate_params.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
191 func_sig = signature(func)
193 # Map *args/**kwargs to the function signature
--> 194 params = func_sig.bind(*args, **kwargs)
195 params.apply_defaults()
197 # ignore self/cls and positional/keyword markers
File ~/miniconda3/lib/python3.12/inspect.py:3277, in Signature.bind(self, *args, **kwargs)
3272 def bind(self, /, *args, **kwargs):
3273 """Get a BoundArguments object, that maps the passed `args`
3274 and `kwargs` to the function's signature. Raises `TypeError`
3275 if the passed arguments can not be bound.
3276 """
-> 3277 return self._bind(args, kwargs)
File ~/miniconda3/lib/python3.12/inspect.py:3266, in Signature._bind(self, args, kwargs, partial)
3256 raise TypeError(
3257 'got some positional-only arguments passed as '
3258 'keyword arguments: {arg!r}'.format(
(...) 3263 ),
3264 )
3265 else:
-> 3266 raise TypeError(
3267 'got an unexpected keyword argument {arg!r}'.format(
3268 arg=next(iter(kwargs))))
3270 return self._bound_arguments_cls(self, arguments)
TypeError: got an unexpected keyword argument 'squared'
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=pred_dtw, name="SVR(DTW-RBF)", line=dict(color="#4E79A7")))
fig.add_trace(go.Scatter(x=t_test, y=pred_rbf, name="SVR(RBF)", line=dict(color="#E15759")))
fig.update_layout(title="One-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
# Residual diagnostics (DTW model)
resid = y_test - pred_dtw
print("residual mean:", float(np.mean(resid)))
print("residual std:", float(np.std(resid, ddof=1)))
print("Jarque-Bera:", stats.jarque_bera(resid))
lags, acf_vals = _acf(resid, max_lag=30)
bound = 1.96 / np.sqrt(resid.size)
# QQ data
nq = resid.size
p = (np.arange(1, nq + 1) - 0.5) / nq
theoretical = stats.norm.ppf(p)
sample_q = np.sort((resid - resid.mean()) / resid.std(ddof=1))
fig = make_subplots(
rows=2,
cols=2,
subplot_titles=("Residuals over time", "Residual histogram", "Residual ACF", "QQ plot (std residuals)"),
)
fig.add_trace(go.Scatter(x=t_test, y=resid, name="residuals", line=dict(color="#4E79A7")), row=1, col=1)
fig.add_hline(y=0, line=dict(color="black", dash="dash"), row=1, col=1)
fig.add_trace(go.Histogram(x=resid, nbinsx=25, name="hist", marker_color="#4E79A7"), row=1, col=2)
fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF(resid)", marker_color="#4E79A7"), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[bound, bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[-bound, -bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=theoretical, y=sample_q, mode="markers", name="QQ", marker=dict(color="#4E79A7")), row=2, col=2)
fig.add_trace(
go.Scatter(x=[theoretical.min(), theoretical.max()], y=[theoretical.min(), theoretical.max()], mode="lines", line=dict(color="black", dash="dash"), showlegend=False),
row=2,
col=2,
)
fig.update_layout(height=750, title="Residual diagnostics (SVR DTW-RBF)")
fig.show()
residual mean: -0.24839058285380267
residual std: 3.4313360450269337
Jarque-Bera: SignificanceResult(statistic=3.9226931240248826, pvalue=0.14066887396930364)